package com.atguigu.flink.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2022/12/10
 */
public class Demo1_BatchExecution
{
    public static void main(String[] args) throws Exception {

        //1.创建编程环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.读数据，封装为数据模型. 默认读取utf-8编码文件
        DataSource<String> dataSource = env.readTextFile("data/words.txt");


        /*
            3.计算

                接口的实现有三种:
                    内部实现类
                    外部实现类
                    匿名内部实现类( 函数式接口 lamda)

               Tuple2:  ck中 tuple(第一个元素key,第二个元素value)


               FlatMapOperator: 封装了对流或批的计算逻辑
         */
        FlatMapOperator<String, Tuple2<String, Integer>> operator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
        {
            @Override
            public void flatMap(String inputLine, Collector<Tuple2<String, Integer>> collector) throws Exception {
                  /*
                        每一行按照 空格切分，每个单词都以 (单词，1)写出

                        collector：用于收集要写出的结果。
                   */
                String[] words = inputLine.split(" ");

                for (String word : words) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        /*
                对Tuple2 groupBy，需要调用 groupBy(int position)
                        position: 要分组的字段在tuple中的位置。
                        
                对POJO(Bean)按照某个属性进行分组，调用 groupBy(String fieldName)
         */
       operator.groupBy(0)
               .sum(1)
               .print();

    }
}
